home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_csv.py < prev    next >
Text File  |  2005-10-18  |  31KB  |  865 lines

  1. # -*- coding: iso-8859-1 -*-
  2. # Copyright (C) 2001,2002 Python Software Foundation
  3. # csv package unit tests
  4.  
  5. import sys
  6. import os
  7. import unittest
  8. from StringIO import StringIO
  9. import tempfile
  10. import csv
  11. import gc
  12. from test import test_support
  13.  
  14. class Test_Csv(unittest.TestCase):
  15.     """
  16.     Test the underlying C csv parser in ways that are not appropriate
  17.     from the high level interface. Further tests of this nature are done
  18.     in TestDialectRegistry.
  19.     """
  20.     def test_reader_arg_valid(self):
  21.         self.assertRaises(TypeError, csv.reader)
  22.         self.assertRaises(TypeError, csv.reader, None)
  23.         self.assertRaises(AttributeError, csv.reader, [], bad_attr = 0)
  24.         self.assertRaises(csv.Error, csv.reader, [], 'foo')
  25.         class BadClass:
  26.             def __init__(self):
  27.                 raise IOError
  28.         self.assertRaises(IOError, csv.reader, [], BadClass)
  29.         self.assertRaises(TypeError, csv.reader, [], None)
  30.         class BadDialect:
  31.             bad_attr = 0
  32.         self.assertRaises(AttributeError, csv.reader, [], BadDialect)
  33.  
  34.     def test_writer_arg_valid(self):
  35.         self.assertRaises(TypeError, csv.writer)
  36.         self.assertRaises(TypeError, csv.writer, None)
  37.         self.assertRaises(AttributeError, csv.writer, StringIO(), bad_attr = 0)
  38.  
  39.     def _test_attrs(self, obj):
  40.         self.assertEqual(obj.dialect.delimiter, ',')
  41.         obj.dialect.delimiter = '\t'
  42.         self.assertEqual(obj.dialect.delimiter, '\t')
  43.         self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter')
  44.         self.assertRaises(TypeError, setattr, obj.dialect,
  45.                           'lineterminator', None)
  46.         obj.dialect.escapechar = None
  47.         self.assertEqual(obj.dialect.escapechar, None)
  48.         self.assertRaises(TypeError, delattr, obj.dialect, 'quoting')
  49.         self.assertRaises(TypeError, setattr, obj.dialect, 'quoting', None)
  50.         obj.dialect.quoting = csv.QUOTE_MINIMAL
  51.         self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
  52.  
  53.     def test_reader_attrs(self):
  54.         self._test_attrs(csv.reader([]))
  55.  
  56.     def test_writer_attrs(self):
  57.         self._test_attrs(csv.writer(StringIO()))
  58.  
  59.     def _write_test(self, fields, expect, **kwargs):
  60.         fd, name = tempfile.mkstemp()
  61.         fileobj = os.fdopen(fd, "w+b")
  62.         try:
  63.             writer = csv.writer(fileobj, **kwargs)
  64.             writer.writerow(fields)
  65.             fileobj.seek(0)
  66.             self.assertEqual(fileobj.read(),
  67.                              expect + writer.dialect.lineterminator)
  68.         finally:
  69.             fileobj.close()
  70.             os.unlink(name)
  71.  
  72.     def test_write_arg_valid(self):
  73.         self.assertRaises(csv.Error, self._write_test, None, '')
  74.         self._write_test((), '')
  75.         self._write_test([None], '""')
  76.         self.assertRaises(csv.Error, self._write_test,
  77.                           [None], None, quoting = csv.QUOTE_NONE)
  78.         # Check that exceptions are passed up the chain
  79.         class BadList:
  80.             def __len__(self):
  81.                 return 10;
  82.             def __getitem__(self, i):
  83.                 if i > 2:
  84.                     raise IOError
  85.         self.assertRaises(IOError, self._write_test, BadList(), '')
  86.         class BadItem:
  87.             def __str__(self):
  88.                 raise IOError
  89.         self.assertRaises(IOError, self._write_test, [BadItem()], '')
  90.  
  91.     def test_write_bigfield(self):
  92.         # This exercises the buffer realloc functionality
  93.         bigstring = 'X' * 50000
  94.         self._write_test([bigstring,bigstring], '%s,%s' % \
  95.                          (bigstring, bigstring))
  96.  
  97.     def test_write_quoting(self):
  98.         self._write_test(['a','1','p,q'], 'a,1,"p,q"')
  99.         self.assertRaises(csv.Error,
  100.                           self._write_test,
  101.                           ['a','1','p,q'], 'a,1,"p,q"',
  102.                           quoting = csv.QUOTE_NONE)
  103.         self._write_test(['a','1','p,q'], 'a,1,"p,q"',
  104.                          quoting = csv.QUOTE_MINIMAL)
  105.         self._write_test(['a','1','p,q'], '"a",1,"p,q"',
  106.                          quoting = csv.QUOTE_NONNUMERIC)
  107.         self._write_test(['a','1','p,q'], '"a","1","p,q"',
  108.                          quoting = csv.QUOTE_ALL)
  109.  
  110.     def test_write_escape(self):
  111.         self._write_test(['a','1','p,q'], 'a,1,"p,q"',
  112.                          escapechar='\\')
  113. # FAILED - needs to be fixed [am]:
  114. #        self._write_test(['a','1','p,"q"'], 'a,1,"p,\\"q\\"',
  115. #                         escapechar='\\', doublequote = 0)
  116.         self._write_test(['a','1','p,q'], 'a,1,p\\,q',
  117.                          escapechar='\\', quoting = csv.QUOTE_NONE)
  118.  
  119.     def test_writerows(self):
  120.         class BrokenFile:
  121.             def write(self, buf):
  122.                 raise IOError
  123.         writer = csv.writer(BrokenFile())
  124.         self.assertRaises(IOError, writer.writerows, [['a']])
  125.         fd, name = tempfile.mkstemp()
  126.         fileobj = os.fdopen(fd, "w+b")
  127.         try:
  128.             writer = csv.writer(fileobj)
  129.             self.assertRaises(TypeError, writer.writerows, None)
  130.             writer.writerows([['a','b'],['c','d']])
  131.             fileobj.seek(0)
  132.             self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
  133.         finally:
  134.             fileobj.close()
  135.             os.unlink(name)
  136.  
  137.     def _read_test(self, input, expect, **kwargs):
  138.         reader = csv.reader(input, **kwargs)
  139.         result = list(reader)
  140.         self.assertEqual(result, expect)
  141.  
  142.     def test_read_oddinputs(self):
  143.         self._read_test([], [])
  144.         self._read_test([''], [[]])
  145.         self.assertRaises(csv.Error, self._read_test,
  146.                           ['"ab"c'], None, strict = 1)
  147.         # cannot handle null bytes for the moment
  148.         self.assertRaises(csv.Error, self._read_test,
  149.                           ['ab\0c'], None, strict = 1)
  150.         self._read_test(['"ab"c'], [['abc']], doublequote = 0)
  151.  
  152.     def test_read_eol(self):
  153.         self._read_test(['a,b'], [['a','b']])
  154.         self._read_test(['a,b\n'], [['a','b']])
  155.         self._read_test(['a,b\r\n'], [['a','b']])
  156.         self._read_test(['a,b\r'], [['a','b']])
  157.         self.assertRaises(csv.Error, self._read_test, ['a,b\rc,d'], [])
  158.         self.assertRaises(csv.Error, self._read_test, ['a,b\nc,d'], [])
  159.         self.assertRaises(csv.Error, self._read_test, ['a,b\r\nc,d'], [])
  160.  
  161.     def test_read_escape(self):
  162.         self._read_test(['a,\\b,c'], [['a', '\\b', 'c']], escapechar='\\')
  163.         self._read_test(['a,b\\,c'], [['a', 'b,c']], escapechar='\\')
  164.         self._read_test(['a,"b\\,c"'], [['a', 'b,c']], escapechar='\\')
  165.         self._read_test(['a,"b,\\c"'], [['a', 'b,\\c']], escapechar='\\')
  166.         self._read_test(['a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\')
  167.         self._read_test(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\')
  168.  
  169.     def test_read_bigfield(self):
  170.         # This exercises the buffer realloc functionality
  171.         bigstring = 'X' * 50000
  172.         bigline = '%s,%s' % (bigstring, bigstring)
  173.         self._read_test([bigline], [[bigstring, bigstring]])
  174.  
  175. class TestDialectRegistry(unittest.TestCase):
  176.     def test_registry_badargs(self):
  177.         self.assertRaises(TypeError, csv.list_dialects, None)
  178.         self.assertRaises(TypeError, csv.get_dialect)
  179.         self.assertRaises(csv.Error, csv.get_dialect, None)
  180.         self.assertRaises(csv.Error, csv.get_dialect, "nonesuch")
  181.         self.assertRaises(TypeError, csv.unregister_dialect)
  182.         self.assertRaises(csv.Error, csv.unregister_dialect, None)
  183.         self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch")
  184.         self.assertRaises(TypeError, csv.register_dialect, None)
  185.         self.assertRaises(TypeError, csv.register_dialect, None, None)
  186.         self.assertRaises(TypeError, csv.register_dialect, "nonesuch", None)
  187.         class bogus:
  188.             def __init__(self):
  189.                 raise KeyError
  190.         self.assertRaises(KeyError, csv.register_dialect, "nonesuch", bogus)
  191.  
  192.     def test_registry(self):
  193.         class myexceltsv(csv.excel):
  194.             delimiter = "\t"
  195.         name = "myexceltsv"
  196.         expected_dialects = csv.list_dialects() + [name]
  197.         expected_dialects.sort()
  198.         csv.register_dialect(name, myexceltsv)
  199.         try:
  200.             self.failUnless(isinstance(csv.get_dialect(name), myexceltsv))
  201.             got_dialects = csv.list_dialects()
  202.             got_dialects.sort()
  203.             self.assertEqual(expected_dialects, got_dialects)
  204.         finally:
  205.             csv.unregister_dialect(name)
  206.  
  207.     def test_incomplete_dialect(self):
  208.         class myexceltsv(csv.Dialect):
  209.             delimiter = "\t"
  210.         self.assertRaises(csv.Error, myexceltsv)
  211.  
  212.     def test_space_dialect(self):
  213.         class space(csv.excel):
  214.             delimiter = " "
  215.             quoting = csv.QUOTE_NONE
  216.             escapechar = "\\"
  217.  
  218.         fd, name = tempfile.mkstemp()
  219.         fileobj = os.fdopen(fd, "w+b")
  220.         try:
  221.             fileobj.write("abc def\nc1ccccc1 benzene\n")
  222.             fileobj.seek(0)
  223.             rdr = csv.reader(fileobj, dialect=space())
  224.             self.assertEqual(rdr.next(), ["abc", "def"])
  225.             self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
  226.         finally:
  227.             fileobj.close()
  228.             os.unlink(name)
  229.  
  230.     def test_dialect_apply(self):
  231.         class testA(csv.excel):
  232.             delimiter = "\t"
  233.         class testB(csv.excel):
  234.             delimiter = ":"
  235.         class testC(csv.excel):
  236.             delimiter = "|"
  237.  
  238.         csv.register_dialect('testC', testC)
  239.         try:
  240.             fd, name = tempfile.mkstemp()
  241.             fileobj = os.fdopen(fd, "w+b")
  242.             try:
  243.                 writer = csv.writer(fileobj)
  244.                 writer.writerow([1,2,3])
  245.                 fileobj.seek(0)
  246.                 self.assertEqual(fileobj.read(), "1,2,3\r\n")
  247.             finally:
  248.                 fileobj.close()
  249.                 os.unlink(name)
  250.  
  251.             fd, name = tempfile.mkstemp()
  252.             fileobj = os.fdopen(fd, "w+b")
  253.             try:
  254.                 writer = csv.writer(fileobj, testA)
  255.                 writer.writerow([1,2,3])
  256.                 fileobj.seek(0)
  257.                 self.assertEqual(fileobj.read(), "1\t2\t3\r\n")
  258.             finally:
  259.                 fileobj.close()
  260.                 os.unlink(name)
  261.  
  262.             fd, name = tempfile.mkstemp()
  263.             fileobj = os.fdopen(fd, "w+b")
  264.             try:
  265.                 writer = csv.writer(fileobj, dialect=testB())
  266.                 writer.writerow([1,2,3])
  267.                 fileobj.seek(0)
  268.                 self.assertEqual(fileobj.read(), "1:2:3\r\n")
  269.             finally:
  270.                 fileobj.close()
  271.                 os.unlink(name)
  272.  
  273.             fd, name = tempfile.mkstemp()
  274.             fileobj = os.fdopen(fd, "w+b")
  275.             try:
  276.                 writer = csv.writer(fileobj, dialect='testC')
  277.                 writer.writerow([1,2,3])
  278.                 fileobj.seek(0)
  279.                 self.assertEqual(fileobj.read(), "1|2|3\r\n")
  280.             finally:
  281.                 fileobj.close()
  282.                 os.unlink(name)
  283.  
  284.             fd, name = tempfile.mkstemp()
  285.             fileobj = os.fdopen(fd, "w+b")
  286.             try:
  287.                 writer = csv.writer(fileobj, dialect=testA, delimiter=';')
  288.                 writer.writerow([1,2,3])
  289.                 fileobj.seek(0)
  290.                 self.assertEqual(fileobj.read(), "1;2;3\r\n")
  291.             finally:
  292.                 fileobj.close()
  293.                 os.unlink(name)
  294.  
  295.         finally:
  296.             csv.unregister_dialect('testC')
  297.  
  298.     def test_bad_dialect(self):
  299.         # Unknown parameter
  300.         self.assertRaises(AttributeError, csv.reader, [], bad_attr = 0)
  301.         # Bad values
  302.         self.assertRaises(TypeError, csv.reader, [], delimiter = None)
  303.         self.assertRaises(TypeError, csv.reader, [], quoting = -1)
  304.         self.assertRaises(TypeError, csv.reader, [], quoting = 100)
  305.  
  306. class TestCsvBase(unittest.TestCase):
  307.     def readerAssertEqual(self, input, expected_result):
  308.         fd, name = tempfile.mkstemp()
  309.         fileobj = os.fdopen(fd, "w+b")
  310.         try:
  311.             fileobj.write(input)
  312.             fileobj.seek(0)
  313.             reader = csv.reader(fileobj, dialect = self.dialect)
  314.             fields = list(reader)
  315.             self.assertEqual(fields, expected_result)
  316.         finally:
  317.             fileobj.close()
  318.             os.unlink(name)
  319.  
  320.     def writerAssertEqual(self, input, expected_result):
  321.         fd, name = tempfile.mkstemp()
  322.         fileobj = os.fdopen(fd, "w+b")
  323.         try:
  324.             writer = csv.writer(fileobj, dialect = self.dialect)
  325.             writer.writerows(input)
  326.             fileobj.seek(0)
  327.             self.assertEqual(fileobj.read(), expected_result)
  328.         finally:
  329.             fileobj.close()
  330.             os.unlink(name)
  331.  
  332. class TestDialectExcel(TestCsvBase):
  333.     dialect = 'excel'
  334.  
  335.     def test_single(self):
  336.         self.readerAssertEqual('abc', [['abc']])
  337.  
  338.     def test_simple(self):
  339.         self.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']])
  340.  
  341.     def test_blankline(self):
  342.         self.readerAssertEqual('', [])
  343.  
  344.     def test_empty_fields(self):
  345.         self.readerAssertEqual(',', [['', '']])
  346.  
  347.     def test_singlequoted(self):
  348.         self.readerAssertEqual('""', [['']])
  349.  
  350.     def test_singlequoted_left_empty(self):
  351.         self.readerAssertEqual('"",', [['','']])
  352.  
  353.     def test_singlequoted_right_empty(self):
  354.         self.readerAssertEqual(',""', [['','']])
  355.  
  356.     def test_single_quoted_quote(self):
  357.         self.readerAssertEqual('""""', [['"']])
  358.  
  359.     def test_quoted_quotes(self):
  360.         self.readerAssertEqual('""""""', [['""']])
  361.  
  362.     def test_inline_quote(self):
  363.         self.readerAssertEqual('a""b', [['a""b']])
  364.  
  365.     def test_inline_quotes(self):
  366.         self.readerAssertEqual('a"b"c', [['a"b"c']])
  367.  
  368.     def test_quotes_and_more(self):
  369.         self.readerAssertEqual('"a"b', [['ab']])
  370.  
  371.     def test_lone_quote(self):
  372.         self.readerAssertEqual('a"b', [['a"b']])
  373.  
  374.     def test_quote_and_quote(self):
  375.         self.readerAssertEqual('"a" "b"', [['a "b"']])
  376.  
  377.     def test_space_and_quote(self):
  378.         self.readerAssertEqual(' "a"', [[' "a"']])
  379.  
  380.     def test_quoted(self):
  381.         self.readerAssertEqual('1,2,3,"I think, therefore I am",5,6',
  382.                                [['1', '2', '3',
  383.                                  'I think, therefore I am',
  384.                                  '5', '6']])
  385.  
  386.     def test_quoted_quote(self):
  387.         self.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"',
  388.                                [['1', '2', '3',
  389.                                  '"I see," said the blind man',
  390.                                  'as he picked up his hammer and saw']])
  391.  
  392.     def test_quoted_nl(self):
  393.         input = '''\
  394. 1,2,3,"""I see,""
  395. said the blind man","as he picked up his
  396. hammer and saw"
  397. 9,8,7,6'''
  398.         self.readerAssertEqual(input,
  399.                                [['1', '2', '3',
  400.                                    '"I see,"\nsaid the blind man',
  401.                                    'as he picked up his\nhammer and saw'],
  402.                                 ['9','8','7','6']])
  403.  
  404.     def test_dubious_quote(self):
  405.         self.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']])
  406.  
  407.     def test_null(self):
  408.         self.writerAssertEqual([], '')
  409.  
  410.     def test_single(self):
  411.         self.writerAssertEqual([['abc']], 'abc\r\n')
  412.  
  413.     def test_simple(self):
  414.         self.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n')
  415.  
  416.     def test_quotes(self):
  417.         self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n')
  418.  
  419.     def test_quote_fieldsep(self):
  420.         self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
  421.  
  422.     def test_newlines(self):
  423.         self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n')
  424.  
  425. class EscapedExcel(csv.excel):
  426.     quoting = csv.QUOTE_NONE
  427.     escapechar = '\\'
  428.  
  429. class TestEscapedExcel(TestCsvBase):
  430.     dialect = EscapedExcel()
  431.  
  432.     def test_escape_fieldsep(self):
  433.         self.writerAssertEqual([['abc,def']], 'abc\\,def\r\n')
  434.  
  435.     def test_read_escape_fieldsep(self):
  436.         self.readerAssertEqual('abc\\,def\r\n', [['abc,def']])
  437.  
  438. class QuotedEscapedExcel(csv.excel):
  439.     quoting = csv.QUOTE_NONNUMERIC
  440.     escapechar = '\\'
  441.  
  442. class TestQuotedEscapedExcel(TestCsvBase):
  443.     dialect = QuotedEscapedExcel()
  444.  
  445.     def test_write_escape_fieldsep(self):
  446.         self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
  447.  
  448.     def test_read_escape_fieldsep(self):
  449.         self.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']])
  450.  
  451. class TestDictFields(unittest.TestCase):
  452.     ### "long" means the row is longer than the number of fieldnames
  453.     ### "short" means there are fewer elements in the row than fieldnames
  454.     def test_write_simple_dict(self):
  455.         fd, name = tempfile.mkstemp()
  456.         fileobj = os.fdopen(fd, "w+b")
  457.         try:
  458.             writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
  459.             writer.writerow({"f1": 10, "f3": "abc"})
  460.             fileobj.seek(0)
  461.             self.assertEqual(fileobj.read(), "10,,abc\r\n")
  462.         finally:
  463.             fileobj.close()
  464.             os.unlink(name)
  465.  
  466.     def test_write_no_fields(self):
  467.         fileobj = StringIO()
  468.         self.assertRaises(TypeError, csv.DictWriter, fileobj)
  469.  
  470.     def test_read_dict_fields(self):
  471.         fd, name = tempfile.mkstemp()
  472.         fileobj = os.fdopen(fd, "w+b")
  473.         try:
  474.             fileobj.write("1,2,abc\r\n")
  475.             fileobj.seek(0)
  476.             reader = csv.DictReader(fileobj,
  477.                                     fieldnames=["f1", "f2", "f3"])
  478.             self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
  479.         finally:
  480.             fileobj.close()
  481.             os.unlink(name)
  482.  
  483.     def test_read_dict_no_fieldnames(self):
  484.         fd, name = tempfile.mkstemp()
  485.         fileobj = os.fdopen(fd, "w+b")
  486.         try:
  487.             fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
  488.             fileobj.seek(0)
  489.             reader = csv.DictReader(fileobj)
  490.             self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
  491.         finally:
  492.             fileobj.close()
  493.             os.unlink(name)
  494.  
  495.     def test_read_long(self):
  496.         fd, name = tempfile.mkstemp()
  497.         fileobj = os.fdopen(fd, "w+b")
  498.         try:
  499.             fileobj.write("1,2,abc,4,5,6\r\n")
  500.             fileobj.seek(0)
  501.             reader = csv.DictReader(fileobj,
  502.                                     fieldnames=["f1", "f2"])
  503.             self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
  504.                                              None: ["abc", "4", "5", "6"]})
  505.         finally:
  506.             fileobj.close()
  507.             os.unlink(name)
  508.  
  509.     def test_read_long_with_rest(self):
  510.         fd, name = tempfile.mkstemp()
  511.         fileobj = os.fdopen(fd, "w+b")
  512.         try:
  513.             fileobj.write("1,2,abc,4,5,6\r\n")
  514.             fileobj.seek(0)
  515.             reader = csv.DictReader(fileobj,
  516.                                     fieldnames=["f1", "f2"], restkey="_rest")
  517.             self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
  518.                                              "_rest": ["abc", "4", "5", "6"]})
  519.         finally:
  520.             fileobj.close()
  521.             os.unlink(name)
  522.  
  523.     def test_read_long_with_rest_no_fieldnames(self):
  524.         fd, name = tempfile.mkstemp()
  525.         fileobj = os.fdopen(fd, "w+b")
  526.         try:
  527.             fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n")
  528.             fileobj.seek(0)
  529.             reader = csv.DictReader(fileobj, restkey="_rest")
  530.             self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
  531.                                              "_rest": ["abc", "4", "5", "6"]})
  532.         finally:
  533.             fileobj.close()
  534.             os.unlink(name)
  535.  
  536.     def test_read_short(self):
  537.         fd, name = tempfile.mkstemp()
  538.         fileobj = os.fdopen(fd, "w+b")
  539.         try:
  540.             fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")
  541.             fileobj.seek(0)
  542.             reader = csv.DictReader(fileobj,
  543.                                     fieldnames="1 2 3 4 5 6".split(),
  544.                                     restval="DEFAULT")
  545.             self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
  546.                                              "4": '4', "5": '5', "6": '6'})
  547.             self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
  548.                                              "4": 'DEFAULT', "5": 'DEFAULT',
  549.                                              "6": 'DEFAULT'})
  550.         finally:
  551.             fileobj.close()
  552.             os.unlink(name)
  553.  
  554.     def test_read_multi(self):
  555.         sample = [
  556.             '2147483648,43.0e12,17,abc,def\r\n',
  557.             '147483648,43.0e2,17,abc,def\r\n',
  558.             '47483648,43.0,170,abc,def\r\n'
  559.             ]
  560.  
  561.         reader = csv.DictReader(sample,
  562.                                 fieldnames="i1 float i2 s1 s2".split())
  563.         self.assertEqual(reader.next(), {"i1": '2147483648',
  564.                                          "float": '43.0e12',
  565.                                          "i2": '17',
  566.                                          "s1": 'abc',
  567.                                          "s2": 'def'})
  568.  
  569.     def test_read_with_blanks(self):
  570.         reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n",
  571.                                  "1,2,abc,4,5,6\r\n"],
  572.                                 fieldnames="1 2 3 4 5 6".split())
  573.         self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
  574.                                          "4": '4', "5": '5', "6": '6'})
  575.         self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
  576.                                          "4": '4', "5": '5', "6": '6'})
  577.  
  578.     def test_read_semi_sep(self):
  579.         reader = csv.DictReader(["1;2;abc;4;5;6\r\n"],
  580.                                 fieldnames="1 2 3 4 5 6".split(),
  581.                                 delimiter=';')
  582.         self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
  583.                                          "4": '4', "5": '5', "6": '6'})
  584.  
  585. class TestArrayWrites(unittest.TestCase):
  586.     def test_int_write(self):
  587.         import array
  588.         contents = [(20-i) for i in range(20)]
  589.         a = array.array('i', contents)
  590.  
  591.         fd, name = tempfile.mkstemp()
  592.         fileobj = os.fdopen(fd, "w+b")
  593.         try:
  594.             writer = csv.writer(fileobj, dialect="excel")
  595.             writer.writerow(a)
  596.             expected = ",".join([str(i) for i in a])+"\r\n"
  597.             fileobj.seek(0)
  598.             self.assertEqual(fileobj.read(), expected)
  599.         finally:
  600.             fileobj.close()
  601.             os.unlink(name)
  602.  
  603.     def test_double_write(self):
  604.         import array
  605.         contents = [(20-i)*0.1 for i in range(20)]
  606.         a = array.array('d', contents)
  607.         fd, name = tempfile.mkstemp()
  608.         fileobj = os.fdopen(fd, "w+b")
  609.         try:
  610.             writer = csv.writer(fileobj, dialect="excel")
  611.             writer.writerow(a)
  612.             expected = ",".join([str(i) for i in a])+"\r\n"
  613.             fileobj.seek(0)
  614.             self.assertEqual(fileobj.read(), expected)
  615.         finally:
  616.             fileobj.close()
  617.             os.unlink(name)
  618.  
  619.     def test_float_write(self):
  620.         import array
  621.         contents = [(20-i)*0.1 for i in range(20)]
  622.         a = array.array('f', contents)
  623.         fd, name = tempfile.mkstemp()
  624.         fileobj = os.fdopen(fd, "w+b")
  625.         try:
  626.             writer = csv.writer(fileobj, dialect="excel")
  627.             writer.writerow(a)
  628.             expected = ",".join([str(i) for i in a])+"\r\n"
  629.             fileobj.seek(0)
  630.             self.assertEqual(fileobj.read(), expected)
  631.         finally:
  632.             fileobj.close()
  633.             os.unlink(name)
  634.  
  635.     def test_char_write(self):
  636.         import array, string
  637.         a = array.array('c', string.letters)
  638.         fd, name = tempfile.mkstemp()
  639.         fileobj = os.fdopen(fd, "w+b")
  640.         try:
  641.             writer = csv.writer(fileobj, dialect="excel")
  642.             writer.writerow(a)
  643.             expected = ",".join(a)+"\r\n"
  644.             fileobj.seek(0)
  645.             self.assertEqual(fileobj.read(), expected)
  646.         finally:
  647.             fileobj.close()
  648.             os.unlink(name)
  649.  
  650. class TestDialectValidity(unittest.TestCase):
  651.     def test_quoting(self):
  652.         class mydialect(csv.Dialect):
  653.             delimiter = ";"
  654.             escapechar = '\\'
  655.             doublequote = False
  656.             skipinitialspace = True
  657.             lineterminator = '\r\n'
  658.             quoting = csv.QUOTE_NONE
  659.         d = mydialect()
  660.  
  661.         mydialect.quoting = None
  662.         self.assertRaises(csv.Error, mydialect)
  663.  
  664.         mydialect.quoting = csv.QUOTE_NONE
  665.         mydialect.escapechar = None
  666.         self.assertRaises(csv.Error, mydialect)
  667.  
  668.         mydialect.doublequote = True
  669.         mydialect.quoting = csv.QUOTE_ALL
  670.         mydialect.quotechar = '"'
  671.         d = mydialect()
  672.  
  673.         mydialect.quotechar = "''"
  674.         self.assertRaises(csv.Error, mydialect)
  675.  
  676.         mydialect.quotechar = 4
  677.         self.assertRaises(csv.Error, mydialect)
  678.  
  679.     def test_delimiter(self):
  680.         class mydialect(csv.Dialect):
  681.             delimiter = ";"
  682.             escapechar = '\\'
  683.             doublequote = False
  684.             skipinitialspace = True
  685.             lineterminator = '\r\n'
  686.             quoting = csv.QUOTE_NONE
  687.         d = mydialect()
  688.  
  689.         mydialect.delimiter = ":::"
  690.         self.assertRaises(csv.Error, mydialect)
  691.  
  692.         mydialect.delimiter = 4
  693.         self.assertRaises(csv.Error, mydialect)
  694.  
  695.     def test_lineterminator(self):
  696.         class mydialect(csv.Dialect):
  697.             delimiter = ";"
  698.             escapechar = '\\'
  699.             doublequote = False
  700.             skipinitialspace = True
  701.             lineterminator = '\r\n'
  702.             quoting = csv.QUOTE_NONE
  703.         d = mydialect()
  704.  
  705.         mydialect.lineterminator = ":::"
  706.         d = mydialect()
  707.  
  708.         mydialect.lineterminator = 4
  709.         self.assertRaises(csv.Error, mydialect)
  710.  
  711.  
  712. class TestSniffer(unittest.TestCase):
  713.     sample1 = """\
  714. Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes
  715. Shark City, Glendale Heights, IL, 12/28/02, Prezence
  716. Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow
  717. Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back
  718. """
  719.     sample2 = """\
  720. 'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes'
  721. 'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence'
  722. 'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow'
  723. 'Stonecutters Seafood and Chop House':'Lemont':'IL':'12/19/02':'Week Back'
  724. """
  725.  
  726.     header = '''\
  727. "venue","city","state","date","performers"
  728. '''
  729.     sample3 = '''\
  730. 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
  731. 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
  732. 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
  733. '''
  734.  
  735.     sample4 = '''\
  736. 2147483648;43.0e12;17;abc;def
  737. 147483648;43.0e2;17;abc;def
  738. 47483648;43.0;170;abc;def
  739. '''
  740.  
  741.     def test_has_header(self):
  742.         sniffer = csv.Sniffer()
  743.         self.assertEqual(sniffer.has_header(self.sample1), False)
  744.         self.assertEqual(sniffer.has_header(self.header+self.sample1), True)
  745.  
  746.     def test_sniff(self):
  747.         sniffer = csv.Sniffer()
  748.         dialect = sniffer.sniff(self.sample1)
  749.         self.assertEqual(dialect.delimiter, ",")
  750.         self.assertEqual(dialect.quotechar, '"')
  751.         self.assertEqual(dialect.skipinitialspace, True)
  752.  
  753.         dialect = sniffer.sniff(self.sample2)
  754.         self.assertEqual(dialect.delimiter, ":")
  755.         self.assertEqual(dialect.quotechar, "'")
  756.         self.assertEqual(dialect.skipinitialspace, False)
  757.  
  758.     def test_delimiters(self):
  759.         sniffer = csv.Sniffer()
  760.         dialect = sniffer.sniff(self.sample3)
  761.         self.assertEqual(dialect.delimiter, "0")
  762.         dialect = sniffer.sniff(self.sample3, delimiters="?,")
  763.         self.assertEqual(dialect.delimiter, "?")
  764.         dialect = sniffer.sniff(self.sample3, delimiters="/,")
  765.         self.assertEqual(dialect.delimiter, "/")
  766.         dialect = sniffer.sniff(self.sample4)
  767.         self.assertEqual(dialect.delimiter, ";")
  768.  
  769. if not hasattr(sys, "gettotalrefcount"):
  770.     if test_support.verbose: print "*** skipping leakage tests ***"
  771. else:
  772.     class NUL:
  773.         def write(s, *args):
  774.             pass
  775.         writelines = write
  776.  
  777.     class TestLeaks(unittest.TestCase):
  778.         def test_create_read(self):
  779.             delta = 0
  780.             lastrc = sys.gettotalrefcount()
  781.             for i in xrange(20):
  782.                 gc.collect()
  783.                 self.assertEqual(gc.garbage, [])
  784.                 rc = sys.gettotalrefcount()
  785.                 csv.reader(["a,b,c\r\n"])
  786.                 csv.reader(["a,b,c\r\n"])
  787.                 csv.reader(["a,b,c\r\n"])
  788.                 delta = rc-lastrc
  789.                 lastrc = rc
  790.             # if csv.reader() leaks, last delta should be 3 or more
  791.             self.assertEqual(delta < 3, True)
  792.  
  793.         def test_create_write(self):
  794.             delta = 0
  795.             lastrc = sys.gettotalrefcount()
  796.             s = NUL()
  797.             for i in xrange(20):
  798.                 gc.collect()
  799.                 self.assertEqual(gc.garbage, [])
  800.                 rc = sys.gettotalrefcount()
  801.                 csv.writer(s)
  802.                 csv.writer(s)
  803.                 csv.writer(s)
  804.                 delta = rc-lastrc
  805.                 lastrc = rc
  806.             # if csv.writer() leaks, last delta should be 3 or more
  807.             self.assertEqual(delta < 3, True)
  808.  
  809.         def test_read(self):
  810.             delta = 0
  811.             rows = ["a,b,c\r\n"]*5
  812.             lastrc = sys.gettotalrefcount()
  813.             for i in xrange(20):
  814.                 gc.collect()
  815.                 self.assertEqual(gc.garbage, [])
  816.                 rc = sys.gettotalrefcount()
  817.                 rdr = csv.reader(rows)
  818.                 for row in rdr:
  819.                     pass
  820.                 delta = rc-lastrc
  821.                 lastrc = rc
  822.             # if reader leaks during read, delta should be 5 or more
  823.             self.assertEqual(delta < 5, True)
  824.  
  825.         def test_write(self):
  826.             delta = 0
  827.             rows = [[1,2,3]]*5
  828.             s = NUL()
  829.             lastrc = sys.gettotalrefcount()
  830.             for i in xrange(20):
  831.                 gc.collect()
  832.                 self.assertEqual(gc.garbage, [])
  833.                 rc = sys.gettotalrefcount()
  834.                 writer = csv.writer(s)
  835.                 for row in rows:
  836.                     writer.writerow(row)
  837.                 delta = rc-lastrc
  838.                 lastrc = rc
  839.             # if writer leaks during write, last delta should be 5 or more
  840.             self.assertEqual(delta < 5, True)
  841.  
  842. # commented out for now - csv module doesn't yet support Unicode
  843. ## class TestUnicode(unittest.TestCase):
  844. ##     def test_unicode_read(self):
  845. ##         import codecs
  846. ##         f = codecs.EncodedFile(StringIO("Martin von L÷wis,"
  847. ##                                         "Marc AndrΘ Lemburg,"
  848. ##                                         "Guido van Rossum,"
  849. ##                                         "Franτois Pinard\r\n"),
  850. ##                                data_encoding='iso-8859-1')
  851. ##         reader = csv.reader(f)
  852. ##         self.assertEqual(list(reader), [[u"Martin von L÷wis",
  853. ##                                          u"Marc AndrΘ Lemburg",
  854. ##                                          u"Guido van Rossum",
  855. ##                                          u"Franτois Pinardn"]])
  856.  
  857. def test_main():
  858.     mod = sys.modules[__name__]
  859.     test_support.run_unittest(
  860.         *[getattr(mod, name) for name in dir(mod) if name.startswith('Test')]
  861.     )
  862.  
  863. if __name__ == '__main__':
  864.     test_main()
  865.